package com.ld.shieldsb.canalclient.handler.impl.db;

import java.sql.Connection;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.sql.DataSource;

import com.ld.shieldsb.canalclient.exception.CanalException;
import com.ld.shieldsb.canalclient.handler.BaseSyncService;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig.DbMapping;
import com.ld.shieldsb.canalclient.model.SingleDml;
import com.ld.shieldsb.canalclient.model.SyncResult;
import com.ld.shieldsb.canalclient.util.CanalUtil;
import com.ld.shieldsb.canalclient.util.SyncUtil;
import com.ld.shieldsb.common.composition.util.ConvertUtil.MapUtil;

import lombok.extern.slf4j.Slf4j;

/**
 * RDB同步操作业务
 *
 * @author rewerma 2018-11-7 下午06:45:49
 * @version 1.0.0
 */
@Slf4j
public class RdbSyncService extends BaseSyncService {

    // 源库表字段类型缓存: instance.schema.table -> <columnName, jdbcType>
    private Map<String, Map<String, Integer>> columnsTypeCache;
    private DataSource dataSource;

    private BatchExecutor[] batchExecutors; // 批量执行器

    public Map<String, Map<String, Integer>> getColumnsTypeCache() {
        return columnsTypeCache;
    }

    public RdbSyncService(DataSource dataSource, Integer threads, boolean skipDupException) {
        this(dataSource, threads, new ConcurrentHashMap<>(), skipDupException);
    }

    /**
     * 构造函数
     * 
     * @author 吕凯
     * @date 2021年12月6日 下午7:09:15
     * @param @param
     *            dataSource
     * @param @param
     *            threads
     * @param @param
     *            columnsTypeCache
     * @param @param
     *            skipDupException 设定文件
     * @throws
     */
    @SuppressWarnings("unchecked")
    public RdbSyncService(DataSource dataSource, Integer threads, Map<String, Map<String, Integer>> columnsTypeCache,
            boolean skipDupException) {
        super();
        this.columnsTypeCache = columnsTypeCache;
        this.skipDupException = skipDupException;
        this.dataSource = dataSource;
        try {
            if (threads != null) {
                this.threads = threads;
            }
            this.dmlsPartition = new List[this.threads];
            this.batchExecutors = new BatchExecutor[this.threads];
            this.executorThreads = new ExecutorService[this.threads];
            for (int i = 0; i < this.threads; i++) {
                dmlsPartition[i] = new ArrayList<>();
                batchExecutors[i] = new BatchExecutor(dataSource);
                executorThreads[i] = Executors.newSingleThreadExecutor();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 
     * 插入操作
     * 
     * @Title insert
     * @author 吕凯
     * @date 2021年12月25日 下午4:57:49
     * @param index
     * @param config
     *            配置项
     * @param dml
     *            DML数据
     * @return
     * @throws SQLException
     * @see com.ld.shieldsb.canalclient.handler.BaseSyncService#insert(int, com.ld.shieldsb.canalclient.handler.config.MappingConfig,
     *      com.ld.shieldsb.canalclient.model.SingleDml)
     */
    @Override
    public SyncResult insert(int index, MappingConfig config, SingleDml dml) throws SQLException {
        SyncResult result = new SyncResult();

        // 小于0，单条操作不需要事务
        BatchExecutor batchExecutor = index < 0 ? new BatchExecutor(dataSource, false) : batchExecutors[index];

        Map<String, Object> data = dml.getData();
        if (data == null || data.isEmpty()) { // 无数据直接返回
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("数据为空！");
            result.setSuccess(false);
            return result;
        }

        DbMapping dbMapping = config.getDbMapping();

        // 自定义字段映射的时候可能不包含主键
        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

        StringBuilder insertSql = new StringBuilder();
        insertSql.append("INSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
        List<String> insertColumnList = new ArrayList<>(); // 插入的字段

        /*columnsMap.forEach((targetColumnName, srcColumnName) -> insertSql.append("`").append(targetColumnName).append("`").append(","));
        int len = insertSql.length();
        insertSql.delete(len - 1, len).append(") VALUES (");
        int mapLen = columnsMap.size();
        for (int i = 0; i < mapLen; i++) {
            insertSql.append("?,");
        }
        len = insertSql.length();
        insertSql.delete(len - 1, len).append(")");*/
        // 获取目标列的类型
        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
        boolean caseInsensitive = dbMapping.isCaseInsensitive();

        List<Map<String, ?>> values = new ArrayList<>();
        boolean hasMatched = false;

        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = CanalUtil.cleanColumn(targetColumnName);
                if (caseInsensitive) { // 忽略键的大小写
                    srcColumnName = srcColumnName.toLowerCase();
                }
            }

            Integer type = ctype.get(CanalUtil.cleanColumn(targetColumnName).toLowerCase());
            if (type == null) { // 目标列不存在
                log.warn(getNotMatchColumnMsg(config, targetColumnName));
            } else {
                hasMatched = true;
                insertColumnList.add(targetColumnName);
                Object value = data.get(srcColumnName);
                BatchExecutor.setValue(values, type, value);
            }
        }
        if (!hasMatched) {
            log.warn("数据库中没有任何匹配的字段可以进行更新操作！！！");
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("没有匹配的字段！");
            result.setSuccess(false);
            return result;
        }
        // 将主键拼接进去
        try {
            appendPk(config, insertColumnList, ctype, values, data);
        } catch (CanalException e) {
            result.setSuccess(false);
            result.setState(SyncResult.HAND_STATE_ERROR);
            result.setMsg(e.getMessage());
            return result;
        }
        // 根据字段拼接sql语句
        insertColumnList.forEach(columnName -> insertSql.append("`").append(columnName).append("`").append(","));
        int len = insertSql.length();
        insertSql.delete(len - 1, len).append(") VALUES (");
        int mapLen = insertColumnList.size();
        for (int i = 0; i < mapLen; i++) {
            insertSql.append("?,");
        }
        len = insertSql.length();
        insertSql.delete(len - 1, len).append(")");

        if (log.isTraceEnabled()) {
            log.trace("插入到目标表, sql: {}", insertSql);
        }
        try {
            batchExecutor.execute(insertSql.toString(), values);
        } catch (Exception e) {
            log.warn("", e);
            // 跳过错误
            String msg = e.getMessage();
            if (e.getMessage().contains("Duplicate entry")) {
                msg = "信息已存在!";
            } else if (e.getMessage().startsWith("ORA-00001:")) {
                // ignore
                // TODO 增加更多关系数据库的主键冲突的错误码
            }
            result.setSuccess(false);
            result.setState(SyncResult.HAND_STATE_ERROR);
            result.setMsg(msg);
            return result;
        }
        return result;

    }

    private void appendPk(MappingConfig config, List<String> insertColumnList, Map<String, Integer> ctype, List<Map<String, ?>> values,
            Map<String, Object> datas) throws CanalException {
        MappingConfig.DbMapping dbMapping = config.getDbMapping();
        boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
        // 拼接主键
        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
            String targetColumnName = entry.getKey();

            // 不包含再说，包含就不用管了
            if (!insertColumnList.contains(targetColumnName)) {
                String srcColumnName = entry.getValue();
                if (srcColumnName == null) {
                    srcColumnName = CanalUtil.cleanColumn(targetColumnName);
                    if (caseInsensitive) { // 忽略键的大小写
                        srcColumnName = srcColumnName.toLowerCase();
                    }
                }
                Integer type = ctype.get(CanalUtil.cleanColumn(targetColumnName).toLowerCase());
                if (type == null) {
                    log.warn(getNotMatchColumnMsg(config, targetColumnName));
                }
                Object pkValue = null;
                // 如果有修改主键的情况,从旧数据里取，否则从新数据里取
                if (datas != null && datas.containsKey(srcColumnName)) {
                    pkValue = datas.get(srcColumnName);
                }
                if (pkValue == null) {
                    throw new CanalException("主键" + srcColumnName + "值为空！");
                }
                insertColumnList.add(targetColumnName);
                BatchExecutor.setValue(values, type, pkValue);
            }
        }
    }

    /**
     * 更新操作
     *
     * @param config
     *            配置项
     * @param dml
     *            DML数据
     */
    @Override
    public SyncResult update(int index, MappingConfig config, SingleDml dml) throws SQLException {
        SyncResult result = SyncResult.builder().build();

        // 小于0，单条操作不需要事务
        BatchExecutor batchExecutor = index < 0 ? new BatchExecutor(dataSource, false) : batchExecutors[index];

        Map<String, Object> data = dml.getData(); // 修改后的全部数据
        Map<String, Object> old = dml.getOld(); // 旧数据
        if (MapUtil.isEmpty(data) || MapUtil.isEmpty(old)) { // 无数据直接返回
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("数据为空！");
            result.setSuccess(false);
            return result;
        }

        DbMapping dbMapping = config.getDbMapping();
        // 目标表与源表的列映射
        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

        StringBuilder updateSql = new StringBuilder();
        updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
        List<Map<String, ?>> values = new ArrayList<>();
        boolean hasMatched = false;
        for (String srcColumnName : old.keySet()) { // 对更新的字段进行过滤
            List<String> targetColumnNames = new ArrayList<>(); // 目标库字段
            columnsMap.forEach((targetColumn, srcColumn) -> { // 过滤源表字段并转换为目标库字段
                if (srcColumnName.equalsIgnoreCase(srcColumn)) {
                    targetColumnNames.add(targetColumn);
                }
            });
            if (!targetColumnNames.isEmpty()) {
                for (String targetColumnName : targetColumnNames) {
                    Integer type = ctype.get(CanalUtil.cleanColumn(targetColumnName).toLowerCase());
                    if (type == null) { // 目标列不存在
                        log.warn(getNotMatchColumnMsg(config, targetColumnName));
                    } else {
                        hasMatched = true;
                        updateSql.append("`").append(targetColumnName).append("`").append("=?, ");
                        BatchExecutor.setValue(values, type, data.get(srcColumnName));
                    }
                }
            }
        }
        if (!hasMatched) {
            log.warn("数据库中没有任何匹配的字段可以进行更新操作！！！");
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("没有匹配的字段！");
            result.setSuccess(false);
            return result;
        }
        int len = updateSql.length();
        updateSql.delete(len - 2, len).append(" WHERE ");

        // 拼接主键
        try {
            appendCondition(config, updateSql, ctype, values, data, old);
        } catch (CanalException e) {
            result.setState(SyncResult.HAND_STATE_ERROR);
            result.setMsg(e.getMessage());
            result.setSuccess(false);
            return result;
        }
        try {
            batchExecutor.execute(updateSql.toString(), values);
        } catch (Exception e) {
            log.warn("", e);
            // 跳过错误
            String msg = e.getMessage();
            if (e.getMessage().contains("Duplicate entry")) {
                msg = "信息已存在!";
            } else if (e.getMessage().startsWith("ORA-00001:")) {
                // ignore
                // TODO 增加更多关系数据库的主键冲突的错误码
            }
            result.setSuccess(false);
            result.setState(SyncResult.HAND_STATE_ERROR);
            result.setMsg(msg);
            return result;
        }
        if (log.isTraceEnabled()) {
            log.trace("更新目标表, sql: {}", updateSql);
        }
        return result;
    }

    /**
     * 获取没有匹配字段的文本显示
     * 
     * @Title getNotMatchColumnMsg
     * @author 吕凯
     * @date 2021年12月8日 下午2:21:40
     * @param config
     * @param dbMapping
     * @param targetColumnName
     * @return String
     */
    private String getNotMatchColumnMsg(MappingConfig config, String targetColumnName) {
        DbMapping dbMapping = config.getDbMapping();
        String sourceTable = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
        String targetTable = dbMapping.getTargetDb() + "." + dbMapping.getTargetTable();
        return sourceTable + "目标字段: " + targetColumnName + " 在目标表 " + targetTable + " 中没有匹配的字段，忽略！";
    }

    /**
     * 删除操作
     *
     * @param config
     * @param dml
     */
    @Override
    public SyncResult delete(int index, MappingConfig config, SingleDml dml) throws SQLException {
        SyncResult result = SyncResult.builder().build();
        // 小于0，单条操作不需要事务
        BatchExecutor batchExecutor = index < 0 ? new BatchExecutor(dataSource, false) : batchExecutors[index];

        Map<String, Object> data = dml.getData();
        if (data == null || data.isEmpty()) {
            result.setState(SyncResult.HAND_STATE_IGNORE);
            result.setMsg("数据为空！");
            result.setSuccess(false);
            return result;
        }

        DbMapping dbMapping = config.getDbMapping();

        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

        StringBuilder sql = new StringBuilder();
        sql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");

        List<Map<String, ?>> values = new ArrayList<>();
        // 拼接主键
        try {
            appendCondition(config, sql, ctype, values, data);
        } catch (CanalException e) {
            result.setState(SyncResult.HAND_STATE_ERROR);
            result.setMsg(e.getMessage());
            result.setSuccess(false);
            return result;
        }
        batchExecutor.execute(sql.toString(), values);
        if (log.isTraceEnabled()) {
            log.trace("删除目标表数据, sql: {}", sql);
        }
        return result;
    }

    /**
     * truncate操作
     *
     * @param config
     */
    @Override
    public SyncResult truncate(int index, MappingConfig config) throws SQLException {
        // 小于0，单条操作不需要事务
        BatchExecutor batchExecutor = index < 0 ? new BatchExecutor(dataSource, false) : batchExecutors[index];

        DbMapping dbMapping = config.getDbMapping();
        StringBuilder sql = new StringBuilder();
        sql.append("TRUNCATE TABLE ").append(SyncUtil.getDbTableName(dbMapping));
        batchExecutor.execute(sql.toString(), new ArrayList<>());
        if (log.isTraceEnabled()) {
            log.trace("清空目标表, sql: {}", sql);
        }
        return SyncResult.builder().build();
    }

    /**
     * 获取目标字段类型
     *
     * @param conn
     *            sql connection
     * @param config
     *            映射配置
     * @return 字段sqlType
     */
    private Map<String, Integer> getTargetColumnType(Connection conn, MappingConfig config) {
        DbMapping dbMapping = config.getDbMapping();
        String cacheKey = config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable();
        // 缓存，先从缓存里取，没有再从数据库取
        Map<String, Integer> columnType = columnsTypeCache.get(cacheKey);
        if (columnType == null) {
            synchronized (RdbSyncService.class) {
                columnType = columnsTypeCache.get(cacheKey);
                if (columnType == null) {
                    columnType = new LinkedHashMap<>();
                    final Map<String, Integer> columnTypeTmp = columnType;
                    String sql = "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE 1=2";
                    CanalUtil.sqlRS(conn, sql, rs -> {
                        try {
                            ResultSetMetaData rsd = rs.getMetaData();
                            int columnCount = rsd.getColumnCount();
                            for (int i = 1; i <= columnCount; i++) {
                                columnTypeTmp.put(rsd.getColumnName(i).toLowerCase(), rsd.getColumnType(i));
                            }
                            columnsTypeCache.put(cacheKey, columnTypeTmp);
                        } catch (SQLException e) {
                            log.error(e.getMessage(), e);
                        }
                    });
                }
            }
        }
        return columnType;
    }

    /**
     * 拼接主键 where条件
     * 
     * @throws Exception
     */
    private void appendCondition(MappingConfig config, StringBuilder sql, Map<String, Integer> ctype, List<Map<String, ?>> values,
            Map<String, Object> datas) throws CanalException {
        appendCondition(config, sql, ctype, values, datas, null);
    }

    /**
     * 拼sql语句
     * 
     * @Title appendCondition
     * @author 吕凯
     * @date 2021年12月6日 上午9:43:51
     * @param dbMapping
     * @param sql
     * @param ctype
     * @param values
     * @param datas
     * @param olds
     *            void
     * @throws Exception
     */
    private void appendCondition(MappingConfig config, StringBuilder sql, Map<String, Integer> ctype, List<Map<String, ?>> values,
            Map<String, Object> datas, Map<String, Object> olds) throws CanalException {
        MappingConfig.DbMapping dbMapping = config.getDbMapping();
        boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
        // 拼接主键
        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = CanalUtil.cleanColumn(targetColumnName);
                if (caseInsensitive) { // 忽略键的大小写
                    srcColumnName = srcColumnName.toLowerCase();
                }
            }
            sql.append("`").append(targetColumnName).append("`").append("=? AND ");
            Integer type = ctype.get(CanalUtil.cleanColumn(targetColumnName).toLowerCase());
            if (type == null) {
                log.warn(getNotMatchColumnMsg(config, targetColumnName));
            }
            Object pkValue = null;
            // 如果有修改主键的情况,从旧数据里取，否则从新数据里取
            if (olds != null && olds.containsKey(srcColumnName)) {
                pkValue = olds.get(srcColumnName);
            } else {
                pkValue = datas.get(srcColumnName);
            }
            if (pkValue == null) {
                throw new CanalException("主键" + srcColumnName + "值为空！");
            }
            BatchExecutor.setValue(values, type, pkValue);
        }
        int len = sql.length();
        sql.delete(len - 4, len);
    }

    @Override
    public void close() {
        super.closeResources();
    }

    // 提交成功后的回调
    @Override
    public void submitSuccess(int index) throws SQLException {
        batchExecutors[index].commit();

    }

    @Override
    public void submitError(int index) throws SQLException {
        batchExecutors[index].rollback();
    }

    // 提交完成后的回调
    @Override
    public void syncComplete() { // 同步完成，关闭执行器
        for (BatchExecutor batchExecutor : batchExecutors) {
            if (batchExecutor != null) {
                batchExecutor.close();
            }
        }

    }

}
